package com.crazymakercircle.completableFutureDemo;

import com.crazymakercircle.util.Print;
import com.crazymakercircle.util.TimeUtil;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class CompletableFutureTestV2 {

    public static final ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(
                                                    5,
                                                    10,
                                                    100,
                                                    TimeUnit.SECONDS,
                                                    new LinkedBlockingDeque<>(1024));

//    public static final ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(
//                                                    15,
//                                                    15,
//                                                    100,
//                                                    TimeUnit.SECONDS,
//                                                    new LinkedBlockingDeque<>(1024));

    public static <T> CompletableFuture<List<T>> sequenceByAllOf(List<CompletableFuture<List<T>>> futures) {
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(
                        v -> futures.stream()
                                .flatMap(future -> future.join().stream().filter(Objects::nonNull))
                                .collect(Collectors.toList())
                );
    }

    public static <T> CompletableFuture<List<T>> sequenceByThenCombine(List<CompletableFuture<List<T>>> completableFutures) {
        return completableFutures.stream()
                .reduce(
                        (future1, future2) -> future1.thenCombine(future2, (list1, list2) -> Stream.of(list1, list2).flatMap(Collection::stream).collect(Collectors.toList()))
                )
                .orElse(CompletableFuture.completedFuture(Collections.emptyList()));
    }

    @Test
    public void test1() {
        Instant start = Instant.now();
        List<CompletableFuture<List<Integer>>> futures = new ArrayList<>();
        IntStream.range(0, 1000).forEach(i -> {
            CompletableFuture<List<Integer>> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Print.tcfo(TimeUtil.currentTime()  +">"+i +" turn start");
                    TimeUnit.SECONDS.sleep(1);
                    Print.tcfo(TimeUtil.currentTime()  +">"+i +" turn end");

                    TimeUnit.MILLISECONDS.sleep(10);

                } catch (InterruptedException e) {
                    System.out.println("InterruptedException");
                }
                return Stream.iterate(i, v -> v).limit(10).collect(Collectors.toList());
            }, THREAD_POOL);
            futures.add(completableFuture);
        });
        CompletableFuture<List<Integer>> cf = sequenceByAllOf(futures);
        System.out.println(cf.join());
        System.out.println("耗时：" + Duration.between(start, Instant.now()).getSeconds());
    }

    @Test
    public void test2() {
        Instant start = Instant.now();
        List<CompletableFuture<List<Integer>>> futures = new ArrayList<>();
        IntStream.range(0, 1000).forEach(i -> {
            CompletableFuture<List<Integer>> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Print.tcfo(TimeUtil.currentTime()  +">"+i +" turn start");
                    TimeUnit.SECONDS.sleep(1);
                    Print.tcfo(TimeUtil.currentTime()  +">"+i +" turn end");

                    TimeUnit.MILLISECONDS.sleep(10);

                } catch (InterruptedException e) {
                    System.out.println("InterruptedException");
                }
                return Stream.iterate(i, v -> v).limit(10).collect(Collectors.toList());
            });
            futures.add(completableFuture);
        });
        CompletableFuture<List<Integer>> cf = sequenceByAllOf(futures);
        System.out.println(cf.join());
        System.out.println("耗时：" + Duration.between(start, Instant.now()).getSeconds());
    }
}
